/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.connector.oracle;

import java.sql.SQLException;
import java.util.concurrent.TimeUnit;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.pipeline.AbstractMetricsTest;
import io.debezium.util.Testing;

public class OracleMetricsIT extends AbstractMetricsTest<OracleConnector> {

    private static OracleConnection connection;

    @Override
    protected Class<OracleConnector> getConnectorClass() {
        return OracleConnector.class;
    }

    @Override
    protected String connector() {
        return TestHelper.CONNECTOR_NAME;
    }

    @Override
    protected String server() {
        return TestHelper.SERVER_NAME;
    }

    @Override
    protected Configuration.Builder config() {
        return TestHelper.defaultConfig()
                .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL);
    }

    @Override
    protected Configuration.Builder noSnapshot(Configuration.Builder config) {
        return config.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA);
    }

    @Override
    protected void executeInsertStatements() throws SQLException {
        connection.executeWithoutCommitting("INSERT INTO debezium.customer VALUES (NULL, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))");
        connection.executeWithoutCommitting("INSERT INTO debezium.customer VALUES (NULL, 'Bruce', 2345.67, null)");
        connection.execute("COMMIT");
    }

    @Override
    protected String tableName() {
        return TestHelper.getDatabaseName() + ".DEBEZIUM.CUSTOMER";
    }

    @Override
    protected long expectedEvents() {
        return 2L;
    }

    @Override
    protected boolean snapshotCompleted() {
        return true;
    }

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();

        TestHelper.dropAllTables();

        TestHelper.dropTable(connection, "debezium.customer");

        String ddl = "create table debezium.customer (" +
                "  id numeric(9,0) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) not null, " +
                "  name varchar2(1000), " +
                "  score decimal(6, 2), " +
                "  registered timestamp, " +
                "  primary key (id)" +
                ")";

        connection.execute(ddl);
        connection.execute("GRANT SELECT ON debezium.customer to  " + TestHelper.getConnectorUserName());
        connection.execute("ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.customer");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        connection.execute("delete from debezium.customer");
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }
}
